{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>7</td><td>application_1561468620886_0009</td><td>pyspark</td><td>idle</td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8088/proxy/application_1561468620886_0009/\">Link</a></td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8042/node/containerlogs/container_e01_1561468620886_0009_01_000001/demo_deep_learning_admin000__meb10000\">Link</a></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n"
     ]
    }
   ],
   "source": [
    "import numpy as np\n",
    "import random\n",
    "import pandas as pd\n",
    "from pyspark.sql import Row\n",
    "from hops import featurestore"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "area_ids = list(range(1,51))\n",
    "house_sizes = []\n",
    "house_worths = []\n",
    "house_ages = []\n",
    "house_area_ids = []\n",
    "for i in area_ids:\n",
    "    for j in list(range(1,100)):\n",
    "        house_sizes.append(abs(np.random.normal()*1000)/i)\n",
    "        house_worths.append(abs(np.random.normal()*10000)/i)\n",
    "        house_ages.append(abs(np.random.normal()*10000)/i)\n",
    "        house_area_ids.append(i)\n",
    "house_ids = list(range(len(house_area_ids)))\n",
    "houses_for_sale_data  = pd.DataFrame({\n",
    "        'area_id':house_area_ids,\n",
    "        'house_id':house_ids,\n",
    "        'house_worth': house_worths,\n",
    "        'house_age': house_ages,\n",
    "        'house_size': house_sizes\n",
    "    })\n",
    "houses_for_sale_data_spark_df = sqlContext.createDataFrame(houses_for_sale_data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+--------+------------------+------------------+------------------+\n",
      "|area_id|house_id|       house_worth|         house_age|        house_size|\n",
      "+-------+--------+------------------+------------------+------------------+\n",
      "|      1|       0|3601.8802099057352|  7795.06845896484|456.58545181313565|\n",
      "|      1|       1| 8263.477857564954|  9391.53780819243|1679.5500736138883|\n",
      "|      1|       2| 7441.333488202338| 142.7748498258131| 549.3300029403817|\n",
      "|      1|       3| 5953.851415439113| 97.83374519792382|2435.0608407543477|\n",
      "|      1|       4| 12791.24528646675|1086.1390807961527| 741.7169220300896|\n",
      "+-------+--------+------------------+------------------+------------------+\n",
      "only showing top 5 rows"
     ]
    }
   ],
   "source": [
    "houses_for_sale_data_spark_df.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- area_id: long (nullable = true)\n",
      " |-- house_id: long (nullable = true)\n",
      " |-- house_worth: double (nullable = true)\n",
      " |-- house_age: double (nullable = true)\n",
      " |-- house_size: double (nullable = true)"
     ]
    }
   ],
   "source": [
    "houses_for_sale_data_spark_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "sum_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy(\"area_id\").sum()\n",
    "count_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy(\"area_id\").count()\n",
    "sum_count_houses_for_sale_df = sum_houses_for_sale_df.join(count_houses_for_sale_df, \"area_id\")\n",
    "sum_count_houses_for_sale_df = sum_count_houses_for_sale_df \\\n",
    "    .withColumnRenamed(\"sum(house_age)\", \"sum_house_age\") \\\n",
    "    .withColumnRenamed(\"sum(house_worth)\", \"sum_house_worth\") \\\n",
    "    .withColumnRenamed(\"sum(house_size)\", \"sum_house_size\") \\\n",
    "    .withColumnRenamed(\"count\", \"num_rows\")\n",
    "def compute_average_features_house_for_sale(row):\n",
    "    avg_house_worth = row.sum_house_worth/float(row.num_rows)\n",
    "    avg_house_size = row.sum_house_size/float(row.num_rows)\n",
    "    avg_house_age = row.sum_house_age/float(row.num_rows)\n",
    "    return Row(\n",
    "        sum_house_worth=row.sum_house_worth, \n",
    "        sum_house_age=row.sum_house_age,\n",
    "        sum_house_size=row.sum_house_size,\n",
    "        area_id = row.area_id,\n",
    "        avg_house_worth = avg_house_worth,\n",
    "        avg_house_size = avg_house_size,\n",
    "        avg_house_age = avg_house_age\n",
    "       )\n",
    "houses_for_sale_features_df = sum_count_houses_for_sale_df.rdd.map(\n",
    "    lambda row: compute_average_features_house_for_sale(row)\n",
    ").toDF()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------------+------------------+------------------+------------------+------------------+------------------+\n",
      "|area_id|     avg_house_age|    avg_house_size|   avg_house_worth|     sum_house_age|    sum_house_size|   sum_house_worth|\n",
      "+-------+------------------+------------------+------------------+------------------+------------------+------------------+\n",
      "|     26| 333.1296441436684| 30.61181421799333|299.42218274902274| 32979.83477022317|  3030.56960758134| 29642.79609215325|\n",
      "|     29|300.66514391677435|  28.3565826537405|265.27163263740397| 29765.84924776066|2807.3016827203096|26261.891631102993|\n",
      "|     19| 539.9744360853701| 42.61610624706339| 461.2294529010087|53457.469172451645| 4218.994518459276| 45661.71583719986|\n",
      "|     22|355.27561487081687| 37.87464176584263|377.56902176073646| 35172.28587221087|3749.5895348184204| 37379.33315431291|\n",
      "|      7|1179.1468181061875|106.41616578898419|1140.3828069179945|116735.53499251256|10535.200413109435|112897.89788488146|\n",
      "+-------+------------------+------------------+------------------+------------------+------------------+------------------+\n",
      "only showing top 5 rows"
     ]
    }
   ],
   "source": [
    "houses_for_sale_features_df.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- area_id: long (nullable = true)\n",
      " |-- avg_house_age: double (nullable = true)\n",
      " |-- avg_house_size: double (nullable = true)\n",
      " |-- avg_house_worth: double (nullable = true)\n",
      " |-- sum_house_age: double (nullable = true)\n",
      " |-- sum_house_size: double (nullable = true)\n",
      " |-- sum_house_worth: double (nullable = true)"
     ]
    }
   ],
   "source": [
    "houses_for_sale_features_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_deep_learning_admin000_featurestore\n",
      "Feature group created successfully"
     ]
    }
   ],
   "source": [
    "featurestore.create_featuregroup(\n",
    "    houses_for_sale_features_df,\n",
    "    \"houses_for_sale_featuregroup\",\n",
    "    description=\"aggregate features of houses for sale per area\",\n",
    "    descriptive_statistics=False,\n",
    "    feature_correlation=False,\n",
    "    feature_histograms=False,\n",
    "    cluster_analysis=False\n",
    ")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
